/*###################################################################
  > File Name: lsv/src/bitmap/lsv_bitmap_hash_cache2.c
  > Author: Vurtune
  > Mail: vurtune@foxmail.com
  > Created Time: Sat 07 Apr 2018 10:58:22 PM PDT
###################################################################*/
#include "config.h"
#include "lich_api.h"

#include <pthread.h>
#include <sys/time.h>
#include <errno.h>

#include "hash_table.h"
#include "lsv_conf.h"
#include "lsv_bitmap_cache_impl.h"

#define MAX_PAGES (256 * 100)

struct __bmap_cache {
        //sy_spinlock_t           lock;
        sy_rwlock_t             lock;
        void                    *volume_context;

        hashtable_t             ht;
        struct list_head        lru;
        uint32_t                pages;
};

typedef struct {
        struct list_head        hook;

        sy_rwlock_t             lock;

        uint64_t                page_idx;
        void                    *page_data;
} lsv_cache_entry_t;

typedef struct {
        uint32_t                offset;
        uint32_t                count;
} alct_arg_t;

#define CACHE_PAGE_SIZE 4096

#define CACHE_GET(e)                    sy_rwlock_rdlock(&e->lock)
#define CACHE_RLS(e)                    sy_rwlock_unlock(&e->lock)
#define CACHE_MDF(e)                    sy_rwlock_wrlock(&e->lock)
#define CACHE_DRP(e)                    sy_rwlock_trywrlock(&e->lock)  // if try lock success, without other lock in queue

#define CACHE_DBG 0

int lsv_bitmap_read_chunk(void *volume_context, uint32_t vvol_id, uint32_t chunk_id, uint32_t chunk_off,
                          uint32_t len, void *buf);

static uint32_t lsv_hash_cache_key_func(const void *k)
{
        const uint64_t *idx = k;
        return (*idx);
}

static int lsv_hash_cache_cmp_func(const void *s1, const void *s2)
{
        const lsv_cache_entry_t *e1 = s1;
        const uint64_t *idx = s2;

        if( e1->page_idx < *idx)
                return -1;
        else if (e1->page_idx > *idx)
                return 1;

        return 0;
}

static int lsv_cache_api_init(struct __bmap_cache **_cache, const char *name, void *volume_context)
{
        int ret;

        struct __bmap_cache *cache = xmalloc(sizeof(struct __bmap_cache));
        if (unlikely(!cache)) {
                ret = ENOMEM;
                GOTO(err_ret, ret);
        }

        //sy_spin_init(&cache->lock);
        sy_rwlock_init(&cache->lock, "bmcache.lock");

        cache->pages = 0;
        cache->volume_context = volume_context;

        INIT_LIST_HEAD(&cache->lru);
        cache->ht = hash_create_table(lsv_hash_cache_cmp_func, lsv_hash_cache_key_func, name);

        *_cache = cache;
        lsv_volume_proto_t *lsv_vol = (lsv_volume_proto_t *)volume_context;
        LSV_DBUG("vol "CHKID_FORMAT" hash cache created!\n", CHKID_ARG(&lsv_vol->volume_proto->chkid));

        return 0;
err_ret:
        return ret;
}

static int lsv_hash_cache_deinit(struct __bmap_cache *cache)
{
        (void) cache;
        YASSERT(0);

        LSV_DBUG("vol hash cache destroy\n");
        return 0;
}

static void hash_cache_paged_lru(struct __bmap_cache *cache)
{
        int ret;
        lsv_cache_entry_t *e2, *first;

        // in cache -> wlock
        if (likely(cache->pages > MAX_PAGES)) {

                DWARN("lsv hash cache lru, pages %u\n", cache->pages);

                first = list_entry(cache->lru.next, lsv_cache_entry_t, hook);

                ret = CACHE_DRP(first);
                if (unlikely(ret)) {
                        DWARN("cache last member could not drop, pages %u\n", cache->pages);
                        YASSERT(cache->pages < MAX_PAGES * 2);
                }

                ret = hash_table_remove(cache->ht, (const void *)&first->page_idx, (void **)&e2);
                if (unlikely(ret))
                        UNIMPLEMENTED(__DUMP__);

                list_del((struct list_head *)first);

                YASSERT(e2 == first);

                xfree((void *)e2->page_data);
                xfree((void *)e2);
        }
}

int hash_cache_paged_load(struct __bmap_cache *cache, uint64_t page_idx, lsv_cache_entry_t **ent)
{
        int ret;
        lsv_cache_entry_t *_e, *tmp;
        char *pname = NULL;

        _e = xmalloc(sizeof(lsv_cache_entry_t));
        if (unlikely(!_e)) {
                ret = ENOMEM;
                GOTO(err_ret, ret);
        }

        sy_rwlock_init(&_e->lock, pname);
        _e->page_idx = page_idx;

        uint32_t chunk_id = (uint32_t)((page_idx * CACHE_PAGE_SIZE) >> 32);
        uint32_t chunk_off = (uint32_t)(page_idx * CACHE_PAGE_SIZE);

        _e->page_data = xmalloc(CACHE_PAGE_SIZE);
        if (unlikely(!_e->page_data)) {
                ret = ENOMEM;
                GOTO(err_free, ret);
        }

        ret = lsv_bitmap_read_chunk(cache->volume_context, 0, chunk_id, chunk_off, CACHE_PAGE_SIZE, _e->page_data);
        if (unlikely(ret))
                GOTO(err_free2, ret);

        ret = sy_rwlock_wrlock(&cache->lock);
        if (unlikely(ret))
                GOTO(err_free2, ret);

        tmp = hash_table_find(cache->ht, (void *)&page_idx);
        if (unlikely(tmp)) {
                DWARN("hash_cache load conflict !!!, page_idx %ju\n", page_idx);
                YASSERT(!memcmp(_e->page_data, tmp->page_data, CACHE_PAGE_SIZE));
                xfree((void *)_e->page_data);
                xfree((void *)_e);
                _e = tmp;
        } else {
                ret = hash_table_insert(cache->ht, _e, &_e->page_idx, 0);
                if (unlikely(ret))
                        GOTO(err_unlock, ret);

                list_add_tail(&_e->hook, &cache->lru);
        }

        hash_cache_paged_lru(cache);

        if (likely(ent)) {
                *ent = _e;
        }

        sy_rwlock_unlock(&cache->lock);

        return 0;
err_unlock:
        sy_rwlock_unlock(&cache->lock);
err_free2:
        xfree((void *)_e->page_data);
err_free:
        xfree((void *)_e);
err_ret:
        return ret;

}

int lsv_volume_chunk_malloc(lsv_volume_proto_t *lsv_info, lsv_u8_t type, lsv_u32_t *chunk_id);

static int hash_cache_allocate_page(lsv_volume_proto_t *lsv_info, uint32_t *chkid, uint32_t *chkoff)
{
        int ret;

        // TODO big lock
        ret = lsv_wrlock(&lsv_info->row3_tail.tail_lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if(!lsv_info->row3_tail.chunk_id || lsv_info->row3_tail.chunk_off == LSV_CHUNK_SIZE) {
                ret = lsv_volume_chunk_malloc(lsv_info, LSV_BITMAP_STORAGE_TYPE, &lsv_info->row3_tail.chunk_id);
                if (unlikely(ret)) {
                        GOTO(err_release, ret);
                }

                LSV_DBUG("malloc chunk_id %u\n", lsv_info->row3_tail.chunk_id);
                lsv_info->row3_tail.chunk_off = 0;
        }

        *chkid = lsv_info->row3_tail.chunk_id;
        *chkoff = lsv_info->row3_tail.chunk_off;

        lsv_info->row3_tail.chunk_off += LSV_PAGE_SIZE;

        lsv_unlock(&lsv_info->row3_tail.tail_lock);


        DBUG("allocate page id %u %u\n", lsv_info->row3_tail.chunk_id, lsv_info->row3_tail.chunk_off);

        return 0;
err_release:
        lsv_unlock(&lsv_info->row3_tail.tail_lock);
err_ret:
        return ret;
}

struct lsv_bitmap_context * lsv_bitmap_volume_to_node(void * volume_context);
int lsv_bitmap_write_chunk(void *volume_context, uint32_t chunk_id, uint32_t chunk_off, uint32_t len, void *buf);

int hash_cache_allocate_range(struct __bmap_cache *cache, lsv_cache_entry_t *e, const alct_arg_t *alct_range)
{
        int ret;
        uint32_t chkid, page_off;
        lsv_volume_proto_t *lsv_info = cache->volume_context;

        DBUG("alct range: %u %u\n", alct_range->offset, alct_range->count);
        YASSERT(alct_range->count <= LSV_PAGE_SIZE / sizeof(lsv_bitmap_unit_t));

        uint32_t count = alct_range->count;

        //big lock
        ret = CACHE_MDF(e);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        void *_page_data = NULL;
        lsv_bitmap_unit_t *unit = e->page_data + (alct_range->offset * sizeof(lsv_bitmap_unit_t));

        int changed = 0; int failed = 0;
        while(likely(count)) {
                if (unlikely(0 == unit->chunk_id)) {

                        if (unlikely(!_page_data)) {
                                _page_data = xmalloc(CACHE_PAGE_SIZE);
                                memcpy(_page_data, e->page_data, CACHE_PAGE_SIZE);
                        }

                        ret = hash_cache_allocate_page(cache->volume_context, &chkid, &page_off);
                        if (unlikely(ret)){
                                DERROR("allocate pages failed, ret %d, need gc!\n", ret);
                                failed = ret;
                                break;
                        }

                        unit->chunk_id = chkid;
                        unit->chunk_page_off = page_off / LSV_PAGE_SIZE;
                        changed ++;
                        LSV_DBUG("alct unit filled chkid: %u page_off:%u\n", unit->chunk_id, unit->chunk_page_off);
                }
                count --;
                unit ++;
        }

        if (unlikely(changed && !failed)) {
                struct lsv_bitmap_context * node = lsv_bitmap_volume_to_node(lsv_info);
                uint32_t chunk_id = (uint32_t)((e->page_idx * CACHE_PAGE_SIZE) >> 32);
                uint32_t chunk_off = (uint32_t)(e->page_idx * CACHE_PAGE_SIZE);

                ret =  lsv_bitmap_write_chunk(node->volume_context, chunk_id, chunk_off, CACHE_PAGE_SIZE, _page_data);
                if (unlikely(ret)) {
                        DERROR("write failed, need drop or reload this cache!!!\n");
                        GOTO(err_release, ret);
                }

        }

        if (unlikely(failed)) {
                //rollback
                memcpy(e->page_data, _page_data, CACHE_PAGE_SIZE);
                ret = failed;
                GOTO(err_release, ret);
        }

        if (unlikely(_page_data))
                xfree(_page_data);

        CACHE_RLS(e);

        return 0;
err_release:
        if (unlikely(_page_data))
                xfree(_page_data);

        CACHE_RLS(e);
err_ret:
        return ret;
}

// page_idx = chunk_id << 32 | page_off
int hash_cache_paged_get(struct __bmap_cache *cache, uint64_t page_idx, alct_arg_t *alct_range, lsv_bitmap_unit_t **out)
{
        int ret;
        lsv_cache_entry_t *e;

        //Because move tail. TODO need a head lock
        ret = sy_rwlock_rdlock(&cache->lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        e = hash_table_find(cache->ht, (void *)&page_idx);
        if(unlikely(e)) {
                /* just for lru. */
                list_move_tail(&cache->lru, &e->hook);
        }

        sy_rwlock_unlock(&cache->lock);

        if (likely(!e)) {
                /* load page && lru. */
                ret = hash_cache_paged_load(cache, page_idx, &e);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        if (unlikely(alct_range)) {
                ret = hash_cache_allocate_range(cache, e, alct_range);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

#if CACHE_DBG
        {
                uint32_t chunk_id = (uint32_t)((page_idx * CACHE_PAGE_SIZE) >> 32);
                uint32_t chunk_off = (uint32_t)(page_idx * CACHE_PAGE_SIZE);

                lsv_bitmap_unit_t *unit = e->page_data;
                for (int i =0; i < CACHE_PAGE_SIZE;) {
                        if (unit->chunk_id)
                                LSV_DBUG("%d. cache id %u cache off %u, uint %u %u\n",
                                                i/8,
                                                chunk_id,
                                                chunk_off,
                                                unit->chunk_id,
                                                unit->chunk_page_off);
                        i += sizeof(lsv_bitmap_unit_t);
                        unit ++;
                }
        }
#endif

        ret = CACHE_GET(e);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        *out = (lsv_bitmap_unit_t *)e->page_data;

        return 0;
err_ret:
        return ret;
}

static int hash_cache_paged_release(struct __bmap_cache *cache, uint64_t page_idx)
{
        int ret;
        lsv_cache_entry_t *e;

        ret = sy_rwlock_rdlock(&cache->lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        e = hash_table_find(cache->ht, (void *)&page_idx);
        if(unlikely(!e)) {
                YASSERT(0 && "why? nf?\n");
        }

        CACHE_RLS(e);

        sy_rwlock_unlock(&cache->lock);

        return 0;
err_ret:
        return ret;
}

int hash_cache_paged_update(struct __bmap_cache *cache, uint64_t page_idx, void *in)
{
        int ret;
        lsv_cache_entry_t *e;

        ret = sy_rwlock_rdlock(&cache->lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        e = hash_table_find(cache->ht, (void *)&page_idx);
        if(unlikely(!e)) {
                YASSERT(0 && "what? not found?\n");
        }

        ret = CACHE_MDF(e);
        if (unlikely(ret))
                GOTO(err_unlock, ret);

        xfree((void *)e->page_data);
        e->page_data = in;

        CACHE_RLS(e);

        sy_rwlock_unlock(&cache->lock);

        return 0;
err_unlock:
        sy_rwlock_unlock(&cache->lock);
err_ret:
        return ret;
}

static inline uint64_t bmrang2pageid(uint32_t bm_chkid, uint32_t bm_off) {
        return (((uint64_t)bm_chkid<<32 | bm_off) / CACHE_PAGE_SIZE);
}

static inline int cache_paged_align(uint32_t bmchk_off, uint32_t bmchk_len, uint32_t *off_align, uint32_t *len_align)
{
        uint32_t off, len;

        YASSERT(bmchk_off + bmchk_len <= CHUNK_SIZE);

        len = bmchk_len;

        if (likely(bmchk_off % LSV_PAGE_SIZE)) {
                off = bmchk_off - (bmchk_off % LSV_PAGE_SIZE);
                len += bmchk_off % LSV_PAGE_SIZE;
        } else {
                off = bmchk_off;
        }

        if (len % LSV_PAGE_SIZE) {
                len = len + (LSV_PAGE_SIZE - (len % LSV_PAGE_SIZE));
        }

        YASSERT(off + len <= CHUNK_SIZE && \
                        !(len % LSV_PAGE_SIZE) && \
                        !(off % LSV_PAGE_SIZE));


        *off_align = off; *len_align = len;
        return 0;
}

/*
 * chunk_id: bitmap chunk id
 * chunk_off: bitmap chunk offset
 * allocate: allocate for write
 * bm_buf: out param
 */
int lsv_cache_api_get(struct __bmap_cache *cache, uint32_t bm_chkid, uint32_t bm_chunk_off, uint32_t length, int allocate, lsv_bitmap_unit_t *bm_buf)
{
        int ret;
        alct_arg_t alct;
        uint32_t off_align, len_align;
        lsv_bitmap_unit_t *bitmap_unit;

        cache_paged_align(bm_chunk_off, length, &off_align, &len_align);

        uint32_t valid_off, valid_len, valid_len2 = 0, valid_left = length;

        while(len_align) {

                valid_off = off_align < bm_chunk_off ? bm_chunk_off : off_align;
                valid_len = off_align + LSV_PAGE_SIZE < bm_chunk_off + length ? \
                            LSV_PAGE_SIZE - valid_off + off_align : \
                            bm_chunk_off + length - valid_off;
                valid_left -= valid_len;

                //写时预分配
                if (allocate) {
                        alct.offset = (valid_off - off_align) / sizeof(lsv_bitmap_unit_t);
                        alct.count = valid_len / sizeof(lsv_bitmap_unit_t);
                }

                DBUG("IO chkid %u "
                                "chkoff %u len %u, "
                                "align %u %u, "
                                "alct[%d] %u %u\n",
                                bm_chkid,
                                bm_chunk_off, length,
                                off_align, len_align,
                                allocate, alct.offset, alct.count
                     );

                ret = hash_cache_paged_get(cache,
                                bmrang2pageid(bm_chkid, off_align),
                                allocate ? &alct : NULL,
                                &bitmap_unit);

                if (unlikely(ret))
                        GOTO(err_ret, ret);

                DBUG("bitmap cpy off_align %u len_align %u, valid_off %u valid_len %u\n",
                                off_align,
                                len_align,
                                valid_off,
                                valid_len);

                //TODO
                memcpy((char *)bm_buf + valid_len2, \
                                (char *)bitmap_unit +  valid_off - off_align, \
                                valid_len);

                valid_len2 += valid_len;
                len_align -= CACHE_PAGE_SIZE;
                off_align += CACHE_PAGE_SIZE;
        }

        YASSERT(valid_left == 0);
        YASSERT(valid_len2 == length);

        return 0;
err_ret:
        return ret;
}

static int lsv_cache_api_release(bm_cache_t cache, uint32_t bm_chkid, uint32_t bm_chunk_off, uint32_t length)
{
        int ret;
        uint32_t off_align, len_align;

        cache_paged_align(bm_chunk_off, length, &off_align, &len_align);

        while(len_align) {

                ret = hash_cache_paged_release(cache,
                                       bmrang2pageid(bm_chkid, off_align));

                if (unlikely(ret))
                        GOTO(err_ret, ret);

                len_align -= CACHE_PAGE_SIZE;
                off_align += CACHE_PAGE_SIZE;
        }

        return 0;
err_ret:
        return ret;
}

bm_cache_api_t hash_cache_api = {
        .init = lsv_cache_api_init,
        .deinit = lsv_hash_cache_deinit,

        .get = lsv_cache_api_get,
        .release = lsv_cache_api_release,

        .update = NULL,
};

bm_cache_api_t *cache_api = &hash_cache_api;
